-
Notifications
You must be signed in to change notification settings - Fork 9
/
acapy_ledger.py
200 lines (167 loc) · 6.27 KB
/
acapy_ledger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
from typing import Optional, Tuple
from aries_cloudcontroller import (
AcaPyClient,
GetDIDEndpointResponse,
SchemaGetResult,
TAAAccept,
TAAInfo,
TAARecord,
TxnOrRegisterLedgerNymResponse,
)
from app.exceptions import CloudApiException, handle_acapy_call
from shared.log_config import get_logger
logger = get_logger(__name__)
async def get_taa(controller: AcaPyClient) -> Tuple[TAAInfo, str]:
"""
Obtains the TAA from the ledger
Parameters:
-----------
controller: AcaPyClient
The aries_cloudcontroller object
Returns:
--------
taa: Tuple[TAAInfo, str]
The TAAInfo object, with the mechanism
"""
logger.debug("Fetching TAA")
taa_response = await handle_acapy_call(
logger=logger, acapy_call=controller.ledger.fetch_taa
)
taa_info = taa_response.result
if not taa_info or (not taa_info.taa_record and taa_info.taa_required):
logger.error("Failed to get TAA. Received info: `{}`.", taa_info)
raise CloudApiException("Something went wrong. Could not get TAA.")
logger.debug("Successfully fetched TAA info: {}", taa_info)
mechanism = (
taa_info.taa_accepted.mechanism
if taa_info.taa_accepted
else "service_agreement"
)
return taa_info, mechanism
async def accept_taa(
controller: AcaPyClient, taa: TAARecord, mechanism: Optional[str] = None
) -> None:
"""
Accept the TAA
Parameters:
-----------
controller: AcaPyClient
The aries_cloudcontroller object
TAA:
The TAA object we want to agree to
mechanism:
An optional mechanism to specify
"""
logger.bind(body=taa).debug("Accepting TAA")
request_body = TAAAccept(**taa.to_dict(), mechanism=mechanism)
try:
await handle_acapy_call(
logger=logger, acapy_call=controller.ledger.accept_taa, body=request_body
)
except CloudApiException as e:
logger.error("An exception occurred while trying to accept TAA.")
raise CloudApiException(
f"An unexpected error occurred while trying to accept TAA: {e.detail}"
) from e
logger.debug("Successfully accepted TAA.")
async def get_did_endpoint(
controller: AcaPyClient, issuer_nym: str
) -> GetDIDEndpointResponse:
"""
Obtains the public DID endpoint
Parameters:
-----------
controller: AcaPyClient
The aries_cloudcontroller object
issuer_nym: str
The issuer's Verinym
Returns:
--------
GetDIDEndpointResponse
The response from getting the public endpoint associated with
the issuer's Verinym from the ledger
"""
bound_logger = logger.bind(body={"issuer_nym": issuer_nym})
bound_logger.debug("Fetching DID endpoint")
issuer_endpoint_response = await handle_acapy_call(
logger=logger, acapy_call=controller.ledger.get_did_endpoint, did=issuer_nym
)
if not issuer_endpoint_response:
bound_logger.info("Failed to get DID endpoint; received empty response.")
raise CloudApiException("Could not obtain issuer endpoint.", 404)
bound_logger.debug("Successfully fetched DID endpoint.")
return issuer_endpoint_response
async def register_nym_on_ledger(
aries_controller: AcaPyClient,
*,
did: str,
verkey: str,
alias: Optional[str] = None,
role: Optional[str] = None,
connection_id: Optional[str] = None,
create_transaction_for_endorser: Optional[str] = None,
) -> TxnOrRegisterLedgerNymResponse:
bound_logger = logger.bind(body={"did": did})
bound_logger.info("Registering NYM on ledger")
try:
response = await handle_acapy_call(
logger=logger,
acapy_call=aries_controller.ledger.register_nym,
did=did,
verkey=verkey,
alias=alias,
role=role,
conn_id=connection_id,
create_transaction_for_endorser=create_transaction_for_endorser,
)
bound_logger.debug("Successfully registered NYM on ledger.")
return response
except CloudApiException as e:
raise CloudApiException(
f"Error registering NYM on ledger: {e.detail}", e.status_code
) from e
async def accept_taa_if_required(aries_controller: AcaPyClient) -> None:
taa_response, mechanism = await get_taa(aries_controller)
if taa_response.taa_required:
await accept_taa(
controller=aries_controller,
taa=taa_response.taa_record,
mechanism=mechanism,
)
async def schema_id_from_credential_definition_id(
controller: AcaPyClient, credential_definition_id: str
) -> str:
"""
From a credential definition, get the identifier for its schema.
Taken from ACA-Py implementation:
https://github.com/openwallet-foundation/acapy/blob/f9506df755e46c5be93b228c8811276b743a1adc/aries_cloudagent/ledger/indy.py#L790
Parameters:
----------
controller: AcaPyClient
The aries_cloudcontroller object
credential_definition_id: The identifier of the credential definition
from which to identify a schema
Returns:
-------
schema_id : string
"""
bound_logger = logger.bind(
body={"credential_definition_id": credential_definition_id}
)
bound_logger.debug("Getting schema id from credential definition id")
# scrape schema id or sequence number from cred def id
tokens = credential_definition_id.split(":")
if len(tokens) == 8: # node protocol >= 1.4: cred def id has 5 or 8 tokens
bound_logger.debug("Constructed schema id from credential definition.")
return ":".join(tokens[3:7]) # schema id spans 0-based positions 3-6
# get txn by sequence number, retrieve schema identifier components
seq_no = tokens[3]
bound_logger.debug("Fetching schema using sequence number: `{}`", seq_no)
schema: SchemaGetResult = await handle_acapy_call(
logger=logger, acapy_call=controller.schema.get_schema, schema_id=seq_no
)
if not schema.var_schema or not schema.var_schema.id:
bound_logger.warning("No schema found with sequence number: `{}`.", seq_no)
raise CloudApiException(f"Schema with id {seq_no} not found.", 404)
bound_logger.debug("Successfully obtained schema id from credential definition.")
return schema.var_schema.id