-
Notifications
You must be signed in to change notification settings - Fork 9
/
test_did_exchange.py
190 lines (164 loc) · 6.83 KB
/
test_did_exchange.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import asyncio
from typing import Optional
import pytest
from aries_cloudcontroller import AcaPyClient
from assertpy import assert_that
from fastapi import HTTPException
from app.routes.admin.tenants import router as tenants_router
from app.routes.connections import router as connections_router
from app.services import acapy_wallet
from app.tests.util.webhooks import check_webhook_state, get_wallet_id_from_async_client
from app.util.did import qualified_did_sov
from shared import RichAsyncClient
CONNECTIONS_BASE_PATH = connections_router.prefix
TENANTS_BASE_PATH = tenants_router.prefix
@pytest.mark.anyio
@pytest.mark.parametrize(
"use_did,use_did_method,use_public_did",
[
(None, None, False),
(True, None, False),
(None, "did:peer:2", False),
(None, "did:peer:4", False),
(True, "did:peer:4", False),
(None, None, True),
],
)
async def test_create_did_exchange_request(
alice_member_client: RichAsyncClient,
faber_client: RichAsyncClient,
alice_acapy_client: AcaPyClient,
faber_acapy_client: AcaPyClient,
use_did: Optional[str],
use_did_method: Optional[str],
use_public_did: bool,
):
faber_public_did = await acapy_wallet.get_public_did(controller=faber_acapy_client)
request_data = {"their_public_did": qualified_did_sov(faber_public_did.did)}
if use_did:
new_did = await acapy_wallet.create_did(controller=alice_acapy_client)
request_data["use_did"] = new_did.did
if use_did_method:
request_data["use_did_method"] = use_did_method
if use_public_did:
request_data["use_public_did"] = use_public_did
if use_public_did: # Alice doesn't have a public DID
with pytest.raises(HTTPException) as exc_info:
response = await alice_member_client.post(
f"{CONNECTIONS_BASE_PATH}/did-exchange/create-request",
params=request_data,
)
assert exc_info.value.status_code == 400
assert exc_info.value.detail == """{"detail":"No public DID configured."}"""
elif use_did and use_did_method:
with pytest.raises(HTTPException) as exc_info:
await alice_member_client.post(
f"{CONNECTIONS_BASE_PATH}/did-exchange/create-request",
params=request_data,
)
assert exc_info.value.status_code == 400
assert (
exc_info.value.detail
== """{"detail":"Cannot specify both use_did and use_did_method."}"""
)
else:
response = await alice_member_client.post(
f"{CONNECTIONS_BASE_PATH}/did-exchange/create-request", params=request_data
)
assert response.status_code == 200
connection_record = response.json()
assert_that(connection_record).contains("connection_id", "state")
assert_that(connection_record["state"]).is_equal_to("request-sent")
alice_connection_id = connection_record["connection_id"]
alice_did = connection_record["my_did"]
try:
# Due to auto-accepts, Alice's connection is complete
assert await check_webhook_state(
alice_member_client,
topic="connections",
state="completed",
filter_map={"connection_id": alice_connection_id},
)
# Faber now has a complete connection too
assert await check_webhook_state(
faber_client,
topic="connections",
state="completed",
filter_map={"their_did": alice_did},
)
finally:
await asyncio.sleep(1) # Short sleep assists in avoiding 500 error
# Delete connection records:
await alice_member_client.delete(
f"{CONNECTIONS_BASE_PATH}/{alice_connection_id}"
)
@pytest.mark.anyio
@pytest.mark.skip(
"Changing Faber wallet settings during xdist run can break other tests. "
"This test works in isolation. Should be refactored to run in parallel."
)
@pytest.mark.parametrize("use_public_did", [False])
async def test_accept_did_exchange_invitation(
alice_member_client: RichAsyncClient,
faber_client: RichAsyncClient,
tenant_admin_client: RichAsyncClient,
faber_acapy_client: AcaPyClient,
use_public_did: bool,
):
# First disable auto-accept invites for Faber
faber_wallet_id = get_wallet_id_from_async_client(client=faber_client)
await tenant_admin_client.put(
f"{TENANTS_BASE_PATH}/{faber_wallet_id}",
json={"extra_settings": {"ACAPY_AUTO_ACCEPT_REQUESTS": False}},
)
faber_public_did = await acapy_wallet.get_public_did(controller=faber_acapy_client)
request_data = {"their_public_did": qualified_did_sov(faber_public_did.did)}
alice_create_request_response = await alice_member_client.post(
f"{CONNECTIONS_BASE_PATH}/did-exchange/create-request", params=request_data
)
alice_create_request_response = alice_create_request_response.json()
alice_connection_id = alice_create_request_response["connection_id"]
alice_did = alice_create_request_response["my_did"]
try:
faber_connection_request_received_event = await check_webhook_state(
faber_client,
topic="connections",
state="request-received",
filter_map={"their_did": alice_did},
)
faber_connection_id = faber_connection_request_received_event["connection_id"]
accept_params = {
"connection_id": faber_connection_id,
"use_public_did": use_public_did,
}
faber_accept_request_response = await faber_client.post(
f"{CONNECTIONS_BASE_PATH}/did-exchange/accept-request", params=accept_params
)
assert faber_accept_request_response.status_code == 200
accept_response = faber_accept_request_response.json()
assert accept_response["state"] == "response-sent"
# Now Alice's connection is complete
assert await check_webhook_state(
alice_member_client,
topic="connections",
state="completed",
filter_map={"connection_id": alice_connection_id},
)
# And Faber's connection is complete
assert await check_webhook_state(
faber_client,
topic="connections",
state="completed",
filter_map={"connection_id": faber_connection_id},
)
finally:
await asyncio.sleep(1) # Short sleep assists in avoiding 500 error
# Delete connection records:
await alice_member_client.delete(
f"{CONNECTIONS_BASE_PATH}/{alice_connection_id}"
)
# Reconfigure ACAPY_AUTO_ACCEPT_REQUESTS for Faber
await tenant_admin_client.put(
f"{TENANTS_BASE_PATH}/{faber_wallet_id}",
json={"extra_settings": {"ACAPY_AUTO_ACCEPT_REQUESTS": True}},
)