Skip to content

Commit

Permalink
minor cleanups; increase test coverage
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Whitehead <cywolf@gmail.com>
  • Loading branch information
andrewwhitehead committed Aug 18, 2021
1 parent c9f0aa5 commit a06d3a6
Show file tree
Hide file tree
Showing 4 changed files with 367 additions and 55 deletions.
332 changes: 311 additions & 21 deletions aries_cloudagent/askar/didcomm/tests/test_v2.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
from aries_askar.store import Session
import json

from asynctest import mock as async_mock
import pytest

from aries_askar import Key, KeyAlg
from aries_askar import AskarError, Key, KeyAlg, Session

from ....config.injection_context import InjectionContext
from ....utils.jwe import b64url, JweEnvelope

from ...profile import AskarProfileManager
from .. import v2 as test_module


ALICE_KID = "did:example:alice#key-1"
BOB_KID = "did:example:bob#key-1"
MESSAGE = b"Expecto patronum"


@pytest.fixture()
async def session():
context = InjectionContext()
Expand All @@ -33,45 +41,327 @@ async def test_es_round_trip(self, session: Session):
alg = KeyAlg.X25519
bob_sk = Key.generate(alg)
bob_pk = Key.from_jwk(bob_sk.get_jwk_public())
bob_kid = "did:example:bob#key-1"
message = b"Expecto patronum"

enc_message = test_module.ecdh_es_encrypt({bob_kid: bob_pk}, message)
enc_message = test_module.ecdh_es_encrypt({BOB_KID: bob_pk}, MESSAGE)

# receiver must have the private keypair accessible
await session.insert_key("my_sk", bob_sk, tags={"kid": bob_kid})
await session.insert_key("my_sk", bob_sk, tags={"kid": BOB_KID})

plaintext, recip_kid, sender_kid = await test_module.unpack_message(
session, enc_message
)
assert recip_kid == bob_kid
assert recip_kid == BOB_KID
assert sender_kid is None
assert plaintext == message
assert plaintext == MESSAGE

@pytest.mark.asyncio
async def test_es_encrypt_x(self, session: Session):
alg = KeyAlg.X25519
bob_sk = Key.generate(alg)
bob_pk = Key.from_jwk(bob_sk.get_jwk_public())

with pytest.raises(
test_module.DidcommEnvelopeError, match="No message recipients"
):
_ = test_module.ecdh_es_encrypt({}, MESSAGE)

alt_sk = Key.generate(KeyAlg.P256)
alt_pk = Key.from_jwk(alt_sk.get_jwk_public())
with pytest.raises(
test_module.DidcommEnvelopeError, match="key types must be consistent"
):
_ = test_module.ecdh_es_encrypt({BOB_KID: bob_pk, "alt": alt_pk}, MESSAGE)

with async_mock.patch(
"aries_askar.Key.generate",
async_mock.MagicMock(side_effect=AskarError(99, "")),
):
with pytest.raises(
test_module.DidcommEnvelopeError,
match="Error creating content encryption key",
):
_ = test_module.ecdh_es_encrypt({BOB_KID: bob_pk}, MESSAGE)

with async_mock.patch(
"aries_askar.Key.aead_encrypt",
async_mock.MagicMock(side_effect=AskarError(99, "")),
):
with pytest.raises(
test_module.DidcommEnvelopeError,
match="Error encrypting",
):
_ = test_module.ecdh_es_encrypt({BOB_KID: bob_pk}, MESSAGE)

@pytest.mark.asyncio
async def test_es_decrypt_x(self):
alg = KeyAlg.X25519
bob_sk = Key.generate(alg)
bob_pk = Key.from_jwk(bob_sk.get_jwk_public())

message_unknown_alg = JweEnvelope(
protected={"alg": "NOT-SUPPORTED"},
)
with pytest.raises(
test_module.DidcommEnvelopeError,
match="Unsupported ECDH-ES algorithm",
):
_ = test_module.ecdh_es_decrypt(message_unknown_alg, bob_sk, b"0000")

message_unknown_enc = JweEnvelope(
protected={"alg": "ECDH-ES+A128KW", "enc": "UNKNOWN"},
)
with pytest.raises(
test_module.DidcommEnvelopeError,
match="Unsupported ECDH-ES content encryption",
):
_ = test_module.ecdh_es_decrypt(message_unknown_enc, bob_sk, b"0000")

message_invalid_epk = JweEnvelope(
protected={"alg": "ECDH-ES+A128KW", "enc": "A256GCM", "epk": {}},
)
with pytest.raises(
test_module.DidcommEnvelopeError,
match="Error loading ephemeral key",
):
_ = test_module.ecdh_es_decrypt(message_invalid_epk, bob_sk, b"0000")

@pytest.mark.asyncio
async def test_1pu_round_trip(self, session: Session):
alg = KeyAlg.X25519
alice_sk = Key.generate(alg)
alice_pk = Key.from_jwk(alice_sk.get_jwk_public())
alice_kid = "did:example:alice#key-1"
bob_sk = Key.generate(alg)
bob_pk = Key.from_jwk(bob_sk.get_jwk_public())
bob_kid = "did:example:bob#key-1"
alice_pk, bob_pk = alice_sk, bob_sk
message = b"Expecto patronum"

# receiver must have the private keypair accessible
await session.insert_key("my_sk", bob_sk, tags={"kid": bob_kid})
# for now at least, insert the sender public key so it can be resolved
await session.insert_key("alice_pk", alice_pk, tags={"kid": alice_kid})

enc_message = test_module.ecdh_1pu_encrypt(
{bob_kid: bob_pk}, alice_kid, alice_sk, message
{BOB_KID: bob_pk}, ALICE_KID, alice_sk, MESSAGE
)

# receiver must have the private keypair accessible
await session.insert_key("my_sk", bob_sk, tags={"kid": BOB_KID})
# for now at least, insert the sender public key so it can be resolved
await session.insert_key("alice_pk", alice_pk, tags={"kid": ALICE_KID})

plaintext, recip_kid, sender_kid = await test_module.unpack_message(
session, enc_message
)
assert recip_kid == bob_kid
assert sender_kid == alice_kid
assert plaintext == message
assert recip_kid == BOB_KID
assert sender_kid == ALICE_KID
assert plaintext == MESSAGE

@pytest.mark.asyncio
async def test_1pu_encrypt_x(self, session: Session):
alg = KeyAlg.X25519
alice_sk = Key.generate(alg)
bob_sk = Key.generate(alg)
bob_pk = Key.from_jwk(bob_sk.get_jwk_public())

with pytest.raises(
test_module.DidcommEnvelopeError, match="No message recipients"
):
_ = test_module.ecdh_1pu_encrypt({}, ALICE_KID, alice_sk, MESSAGE)

alt_sk = Key.generate(KeyAlg.P256)
alt_pk = Key.from_jwk(alt_sk.get_jwk_public())
with pytest.raises(
test_module.DidcommEnvelopeError, match="key types must be consistent"
):
_ = test_module.ecdh_1pu_encrypt(
{BOB_KID: bob_pk, "alt": alt_pk}, ALICE_KID, alice_sk, MESSAGE
)

with async_mock.patch(
"aries_askar.Key.generate",
async_mock.MagicMock(side_effect=AskarError(99, "")),
):
with pytest.raises(
test_module.DidcommEnvelopeError,
match="Error creating content encryption key",
):
_ = test_module.ecdh_1pu_encrypt(
{BOB_KID: bob_pk}, ALICE_KID, alice_sk, MESSAGE
)

with async_mock.patch(
"aries_askar.Key.aead_encrypt",
async_mock.MagicMock(side_effect=AskarError(99, "")),
):
with pytest.raises(
test_module.DidcommEnvelopeError,
match="Error encrypting",
):
_ = test_module.ecdh_1pu_encrypt(
{BOB_KID: bob_pk}, ALICE_KID, alice_sk, MESSAGE
)

@pytest.mark.asyncio
async def test_1pu_decrypt_x(self):
alg = KeyAlg.X25519
alice_sk = Key.generate(alg)
alice_pk = Key.from_jwk(alice_sk.get_jwk_public())
bob_sk = Key.generate(alg)
bob_pk = Key.from_jwk(bob_sk.get_jwk_public())

message_unknown_alg = JweEnvelope(
protected={"alg": "NOT-SUPPORTED"},
)
with pytest.raises(
test_module.DidcommEnvelopeError,
match="Unsupported ECDH-1PU algorithm",
):
_ = test_module.ecdh_1pu_decrypt(
message_unknown_alg, alice_pk, bob_sk, b"0000"
)

message_unknown_enc = JweEnvelope(
protected={"alg": "ECDH-1PU+A128KW", "enc": "UNKNOWN"},
)
with pytest.raises(
test_module.DidcommEnvelopeError,
match="Unsupported ECDH-1PU content encryption",
):
_ = test_module.ecdh_1pu_decrypt(
message_unknown_enc, alice_pk, bob_sk, b"0000"
)

message_invalid_epk = JweEnvelope(
protected={"alg": "ECDH-1PU+A128KW", "enc": "A256CBC-HS512", "epk": {}},
)
with pytest.raises(
test_module.DidcommEnvelopeError,
match="Error loading ephemeral key",
):
_ = test_module.ecdh_1pu_decrypt(
message_invalid_epk, alice_pk, bob_sk, b"0000"
)

@pytest.mark.asyncio
async def test_unpack_message_any_x(self, session: Session):
message_invalid = "{}"

with pytest.raises(
test_module.DidcommEnvelopeError,
match="Invalid",
):
_ = await test_module.unpack_message(session, message_invalid)

message_unknown_alg = json.dumps(
{
"protected": b64url(json.dumps({"alg": "NOT-SUPPORTED"})),
"recipients": [{"header": {"kid": "bob"}, "encrypted_key": "MTIzNA"}],
"iv": "MTIzNA",
"ciphertext": "MTIzNA",
"tag": "MTIzNA",
}
)

with pytest.raises(
test_module.DidcommEnvelopeError,
match="Unsupported DIDComm encryption",
):
_ = await test_module.unpack_message(session, message_unknown_alg)

message_unknown_recip = json.dumps(
{
"protected": b64url(json.dumps({"alg": "ECDH-ES+A128KW"})),
"recipients": [{"header": {"kid": "bob"}, "encrypted_key": "MTIzNA"}],
"iv": "MTIzNA",
"ciphertext": "MTIzNA",
"tag": "MTIzNA",
}
)

with pytest.raises(
test_module.DidcommEnvelopeError,
match="No recognized recipient key",
):
_ = await test_module.unpack_message(session, message_unknown_recip)

@pytest.mark.asyncio
async def test_unpack_message_1pu_x(self, session: Session):
alg = KeyAlg.X25519
alice_sk = Key.generate(alg)
alice_pk = Key.from_jwk(alice_sk.get_jwk_public())
bob_sk = Key.generate(alg)
bob_pk = Key.from_jwk(bob_sk.get_jwk_public())

# receiver must have the private keypair accessible
await session.insert_key("my_sk", bob_sk, tags={"kid": BOB_KID})
# for now at least, insert the sender public key so it can be resolved
await session.insert_key("alice_pk", alice_pk, tags={"kid": ALICE_KID})

message_1pu_no_skid = json.dumps(
{
"protected": b64url(json.dumps({"alg": "ECDH-1PU+A128KW"})),
"recipients": [{"header": {"kid": BOB_KID}, "encrypted_key": "MTIzNA"}],
"iv": "MTIzNA",
"ciphertext": "MTIzNA",
"tag": "MTIzNA",
}
)

with pytest.raises(
test_module.DidcommEnvelopeError,
match="Sender key ID not provided",
):
_ = await test_module.unpack_message(session, message_1pu_no_skid)

message_1pu_unknown_skid = json.dumps(
{
"protected": b64url(
json.dumps({"alg": "ECDH-1PU+A128KW", "skid": "UNKNOWN"})
),
"recipients": [{"header": {"kid": BOB_KID}, "encrypted_key": "MTIzNA"}],
"iv": "MTIzNA",
"ciphertext": "MTIzNA",
"tag": "MTIzNA",
}
)

with pytest.raises(
test_module.DidcommEnvelopeError,
match="Sender public key not found",
):
_ = await test_module.unpack_message(session, message_1pu_unknown_skid)

message_1pu_apu_invalid = json.dumps(
{
"protected": b64url(
json.dumps({"alg": "ECDH-1PU+A128KW", "skid": "A", "apu": "A"})
),
"recipients": [{"header": {"kid": BOB_KID}, "encrypted_key": "MTIzNA"}],
"iv": "MTIzNA",
"ciphertext": "MTIzNA",
"tag": "MTIzNA",
}
)

with pytest.raises(
test_module.DidcommEnvelopeError,
match="Invalid apu value",
):
_ = await test_module.unpack_message(session, message_1pu_apu_invalid)

message_1pu_apu_mismatch = json.dumps(
{
"protected": b64url(
json.dumps(
{
"alg": "ECDH-1PU+A128KW",
"skid": ALICE_KID,
"apu": b64url("UNKNOWN"),
}
)
),
"recipients": [{"header": {"kid": BOB_KID}, "encrypted_key": "MTIzNA"}],
"iv": "MTIzNA",
"ciphertext": "MTIzNA",
"tag": "MTIzNA",
}
)

with pytest.raises(
test_module.DidcommEnvelopeError,
match="Mismatch between skid and apu",
):
_ = await test_module.unpack_message(session, message_1pu_apu_mismatch)
Loading

0 comments on commit a06d3a6

Please sign in to comment.