Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes in Endorser Protocol #1134

Merged
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from asynctest import (
mock as async_mock,
TestCase as AsyncTestCase,
)

from ......connections.models.conn_record import ConnRecord
from ......messaging.request_context import RequestContext
from ......messaging.responder import MockResponder
from ......transport.inbound.receipt import MessageReceipt

from ...handlers import transaction_acknowledgement_handler as test_module
from ...messages.transaction_acknowledgement import TransactionAcknowledgement


class TestTransactionAcknowledgementHandler(AsyncTestCase):
async def test_called(self):
request_context = RequestContext.test_context()
request_context.message_receipt = MessageReceipt()

with async_mock.patch.object(
test_module, "TransactionManager", autospec=True
) as mock_tran_mgr:
mock_tran_mgr.return_value.receive_transaction_acknowledgement = (
async_mock.CoroutineMock()
)
request_context.message = TransactionAcknowledgement()
request_context.connection_record = ConnRecord(
connection_id="b5dc1636-a19a-4209-819f-e8f9984d9897"
)
request_context.connection_ready = True
handler = test_module.TransactionAcknowledgementHandler()
responder = MockResponder()
await handler.handle(request_context, responder)

mock_tran_mgr.return_value.receive_transaction_acknowledgement.assert_called_once_with(
request_context.message, request_context.connection_record.connection_id
)
assert not responder.messages

async def test_called_not_ready(self):
request_context = RequestContext.test_context()
request_context.message_receipt = MessageReceipt()
request_context.connection_record = async_mock.MagicMock()

with async_mock.patch.object(
test_module, "TransactionManager", autospec=True
) as mock_tran_mgr:
mock_tran_mgr.return_value.receive_transaction_acknowledgement = (
async_mock.CoroutineMock()
)
request_context.message = TransactionAcknowledgement()
request_context.connection_ready = False
handler = test_module.TransactionAcknowledgementHandler()
responder = MockResponder()
with self.assertRaises(test_module.HandlerException):
await handler.handle(request_context, responder)

assert not responder.messages

async def test_called_x(self):
request_context = RequestContext.test_context()
request_context.message_receipt = MessageReceipt()

with async_mock.patch.object(
test_module, "TransactionManager", autospec=True
) as mock_tran_mgr:
mock_tran_mgr.return_value.receive_transaction_acknowledgement = (
async_mock.CoroutineMock(
side_effect=test_module.TransactionManagerError()
)
)
request_context.message = TransactionAcknowledgement()
request_context.connection_record = ConnRecord(
connection_id="b5dc1636-a19a-4209-819f-e8f9984d9897"
)
request_context.connection_ready = True
handler = test_module.TransactionAcknowledgementHandler()
responder = MockResponder()
await handler.handle(request_context, responder)

mock_tran_mgr.return_value.receive_transaction_acknowledgement.assert_called_once_with(
request_context.message, request_context.connection_record.connection_id
)
assert not responder.messages
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Transaction acknowledgement message handler."""

from .....messaging.base_handler import (
BaseHandler,
BaseResponder,
HandlerException,
RequestContext,
)

from ..manager import TransactionManager, TransactionManagerError
from ..messages.transaction_acknowledgement import TransactionAcknowledgement


class TransactionAcknowledgementHandler(BaseHandler):
"""Message handler class for Acknowledging transaction."""

async def handle(self, context: RequestContext, responder: BaseResponder):
"""
Handle transaction acknowledgement message.

Args:
context: Request context
responder: Responder callback
"""

self._logger.debug(
f"TransactionAcknowledgementHandler called with context {context}"
)
assert isinstance(context.message, TransactionAcknowledgement)

if not context.connection_ready:
raise HandlerException("No connection established")

profile_session = await context.session()
mgr = TransactionManager(profile_session)
try:
await mgr.receive_transaction_acknowledgement(
context.message, context.connection_record.connection_id
)
except TransactionManagerError:
self._logger.exception("Error receiving transaction acknowledgement")
194 changes: 180 additions & 14 deletions aries_cloudagent/protocols/endorse_transaction/v1_0/manager.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,33 @@
"""Class to manage transactions."""

from aiohttp import web
import json
import logging
import uuid

from .models.transaction_record import TransactionRecord
from .messages.transaction_request import TransactionRequest
from .messages.endorsed_transaction_response import EndorsedTransactionResponse
from .messages.refused_transaction_response import RefusedTransactionResponse
from .messages.cancel_transaction import CancelTransaction
from .messages.transaction_resend import TransactionResend
from .messages.transaction_job_to_send import TransactionJobToSend
from asyncio import shield
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this would be:

import json
import logging
import uuid

from asyncio import shield
from time import time

from ....connections.models.conn_record import ConnRecord
from ....core.error import BaseError
from ....core.profile import ProfileSession
from ....indy.issuer import IndyIssuerError
from ....ledger.base import BaseLedger
from ....ledger.error import LedgerError
from ....messaging.credential_definitions.util import CRED_DEF_SENT_RECORD_TYPE
from ....messaging.schemas.util import SCHEMA_SENT_RECORD_TYPE
from ....storage.base import StorageRecord
from ....storage.error import StorageNotFoundError
from ....transport.inbound.receipt import MessageReceipt

from .messages.cancel_transaction import CancelTransaction
from .messages.endorsed_transaction_response import EndorsedTransactionResponse
from .messages.refused_transaction_response import RefusedTransactionResponse
from .messages.transaction_acknowledgement import TransactionAcknowledgement
from .messages.transaction_job_to_send import TransactionJobToSend
from .messages.transaction_request import TransactionRequest
from .messages.transaction_resend import TransactionResend
from .models.transaction_record import TransactionRecord
from .transaction_jobs import TransactionJob

It would be around this many imports that adding another one starts to risk duplication, if they're out of order.

from time import time

from ....connections.models.conn_record import ConnRecord
from ....transport.inbound.receipt import MessageReceipt
from ....storage.error import StorageNotFoundError

from ....core.error import BaseError
from ....core.profile import ProfileSession
from ....indy.issuer import IndyIssuerError
from ....ledger.base import BaseLedger
from ....ledger.error import LedgerError
from ....messaging.credential_definitions.util import CRED_DEF_SENT_RECORD_TYPE
from ....messaging.schemas.util import SCHEMA_SENT_RECORD_TYPE
from ....storage.base import StorageRecord
from ....storage.error import StorageNotFoundError
from ....transport.inbound.receipt import MessageReceipt

from .messages.cancel_transaction import CancelTransaction
from .messages.endorsed_transaction_response import EndorsedTransactionResponse
from .messages.refused_transaction_response import RefusedTransactionResponse
from .messages.transaction_acknowledgement import TransactionAcknowledgement
from .messages.transaction_job_to_send import TransactionJobToSend
from .messages.transaction_request import TransactionRequest
from .messages.transaction_resend import TransactionResend
from .models.transaction_record import TransactionRecord
from .transaction_jobs import TransactionJob


class TransactionManagerError(BaseError):
Expand Down Expand Up @@ -283,7 +293,7 @@ async def complete_transaction(self, transaction: TransactionRecord):
"""
Complete a transaction.

This is the final state after the received ledger transaction
This is the final state where the received ledger transaction
is written to the ledger.

Args:
Expand All @@ -294,11 +304,85 @@ async def complete_transaction(self, transaction: TransactionRecord):

"""

transaction.state = TransactionRecord.STATE_TRANSACTION_COMPLETED
profile_session = await self.session
transaction.state = TransactionRecord.STATE_TRANSACTION_ACKED

async with profile_session.profile.session() as session:
await transaction.save(session, reason="Completed transaction")

connection_id = transaction.connection_id

async with profile_session.profile.session() as session:
connection_record = await ConnRecord.retrieve_by_id(session, connection_id)
jobs = await connection_record.metadata_get(self._session, "transaction_jobs")
if not jobs:
raise TransactionManagerError(
"The transaction related jobs are not set up in "
"connection metadata for this connection record"
)
if "transaction_my_job" not in jobs.keys():
raise TransactionManagerError(
'The "transaction_my_job" is not set in "transaction_jobs"'
" in connection metadata for this connection record"
)
if jobs["transaction_my_job"] == TransactionJob.TRANSACTION_AUTHOR.name:
await self.store_record_in_wallet(transaction)

transaction_acknowledgement_message = TransactionAcknowledgement(
thread_id=transaction._id
)

return transaction, transaction_acknowledgement_message

async def receive_transaction_acknowledgement(
self, response: TransactionAcknowledgement, connection_id: str
):
"""
Update the transaction record after receiving the transaction acknowledgement.

Args:
response: The transaction acknowledgement
connection_id: The connection_id related to this Transaction Record
"""

profile_session = await self.session
async with profile_session.profile.session() as session:
transaction = await TransactionRecord.retrieve_by_connection_and_thread(
session, connection_id, response.thread_id
)

if transaction.state != TransactionRecord.STATE_TRANSACTION_ENDORSED:
raise TransactionManagerError(
"Only an endorsed transaction can be written to the ledger."
)

transaction.state = TransactionRecord.STATE_TRANSACTION_ACKED
async with profile_session.profile.session() as session:
await transaction.save(session, reason="Received a transaction ack")

connection_id = transaction.connection_id

try:
async with profile_session.profile.session() as session:
connection_record = await ConnRecord.retrieve_by_id(
session, connection_id
)
except StorageNotFoundError as err:
raise TransactionManagerError(err.roll_up) from err
jobs = await connection_record.metadata_get(self._session, "transaction_jobs")
if not jobs:
raise TransactionManagerError(
"The transaction related jobs are not set up in "
"connection metadata for this connection record"
)
if "transaction_my_job" not in jobs.keys():
raise TransactionManagerError(
'The "transaction_my_job" is not set in "transaction_jobs"'
" in connection metadata for this connection record"
)
if jobs["transaction_my_job"] == TransactionJob.TRANSACTION_AUTHOR.name:
await self.store_record_in_wallet(transaction)

return transaction

async def create_refuse_response(
Expand Down Expand Up @@ -531,7 +615,7 @@ async def set_transaction_their_job(
self._session, receipt.sender_did, receipt.recipient_did
)
except StorageNotFoundError as err:
raise web.HTTPNotFound(reason=err.roll_up) from err
raise TransactionManagerError(err.roll_up) from err

value = await connection.metadata_get(self._session, "transaction_jobs")
if value:
Expand All @@ -541,3 +625,85 @@ async def set_transaction_their_job(
await connection.metadata_set(
self._session, key="transaction_jobs", value=value
)

async def store_record_in_wallet(self, transaction: TransactionRecord):
"""
Store record in wallet.

Args:
transaction: The transaction from which the schema/cred_def
would be stored in wallet.
"""

ledger_transaction = transaction.messages_attach[0]["data"]["json"]

ledger = self._session.inject(BaseLedger)
if not ledger:
reason = "No ledger available"
if not self._session.context.settings.get_value("wallet.type"):
reason += ": missing wallet-type?"
raise TransactionManagerError(reason)

async with ledger:
try:
ledger_response_json = await shield(
ledger.txn_submit(ledger_transaction, sign=False, taa_accept=False)
)
except (IndyIssuerError, LedgerError) as err:
raise TransactionManagerError(err.roll_up) from err

ledger_response = json.loads(ledger_response_json)

# write the wallet non-secrets record
# TODO refactor this code (duplicated from ledger.indy.py)
if ledger_response["result"]["txn"]["type"] == "101":
# schema transaction
schema_id = ledger_response["result"]["txnMetadata"]["txnId"]
schema_id_parts = schema_id.split(":")
public_did = ledger_response["result"]["txn"]["metadata"]["from"]
schema_tags = {
"schema_id": schema_id,
"schema_issuer_did": public_did,
"schema_name": schema_id_parts[-2],
"schema_version": schema_id_parts[-1],
"epoch": str(int(time())),
}
record = StorageRecord(SCHEMA_SENT_RECORD_TYPE, schema_id, schema_tags)
# TODO refactor this code?
async with ledger:
storage = ledger.get_indy_storage()
await storage.add_record(record)

elif ledger_response["result"]["txn"]["type"] == "102":
# cred def transaction
async with ledger:
try:
schema_seq_no = str(ledger_response["result"]["txn"]["data"]["ref"])
schema_response = await shield(ledger.get_schema(schema_seq_no))
except (IndyIssuerError, LedgerError) as err:
raise TransactionManagerError(err.roll_up) from err

schema_id = schema_response["id"]
schema_id_parts = schema_id.split(":")
public_did = ledger_response["result"]["txn"]["metadata"]["from"]
credential_definition_id = ledger_response["result"]["txnMetadata"]["txnId"]
cred_def_tags = {
"schema_id": schema_id,
"schema_issuer_did": schema_id_parts[0],
"schema_name": schema_id_parts[-2],
"schema_version": schema_id_parts[-1],
"issuer_did": public_did,
"cred_def_id": credential_definition_id,
"epoch": str(int(time())),
}
record = StorageRecord(
CRED_DEF_SENT_RECORD_TYPE, credential_definition_id, cred_def_tags
)
# TODO refactor this code?
async with ledger:
storage = ledger.get_indy_storage()
await storage.add_record(record)

else:
# TODO unknown ledger transaction type, just ignore for now ...
pass
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
CANCEL_TRANSACTION = "transactions/1.0/cancel"
TRANSACTION_RESEND = "transactions/1.0/resend"
TRANSACTION_JOB_TO_SEND = "transactions/1.0/transaction_my_job"
TRANSACTION_ACKNOWLEDGEMENT = "transactions/1.0/ack"
ATTACHED_MESSAGE = "transactions/1.0/message"
PROTOCOL_PACKAGE = "aries_cloudagent.protocols.endorse_transaction.v1_0"

Expand All @@ -36,5 +37,9 @@
TRANSACTION_JOB_TO_SEND: (
f"{PROTOCOL_PACKAGE}.messages.transaction_job_to_send.TransactionJobToSend"
),
TRANSACTION_ACKNOWLEDGEMENT: (
f"{PROTOCOL_PACKAGE}.messages.transaction_acknowledgement"
".TransactionAcknowledgement"
),
}
)
Loading