Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: mediation record checks for pickup v2 #736

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions packages/core/src/modules/routing/RecipientModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ export class RecipientModule {
await this.openMediationWebSocket(mediator)
if (mediator.pickupStrategy === MediatorPickupStrategy.PickUpV2) {
// Start Pickup v2 protocol to receive messages received while websocket offline
await this.mediationRecipientService.requestStatus({ mediatorId: mediator.id })
await this.sendStatusRequest({ mediatorId: mediator.id })
}
} catch (error) {
this.logger.warn('Unable to re-open websocket connection to mediator', { error })
Expand All @@ -182,7 +182,7 @@ export class RecipientModule {
case MediatorPickupStrategy.PickUpV2:
this.agentConfig.logger.info(`Starting pickup of messages from mediator '${mediator.id}'`)
await this.openWebSocketAndPickUp(mediator)
await this.mediationRecipientService.requestStatus({ mediatorId: mediator.id })
await this.sendStatusRequest({ mediatorId: mediator.id })
break
case MediatorPickupStrategy.PickUpV1: {
// Explicit means polling every X seconds with batch message
Expand All @@ -207,6 +207,17 @@ export class RecipientModule {
}
}

private async sendStatusRequest(config: { mediatorId: string; recipientKey?: string }) {
const mediationRecord = await this.mediationRecipientService.getById(config.mediatorId)

const statusRequestMessage = await this.mediationRecipientService.createStatusRequest(mediationRecord, {
recipientKey: config.recipientKey,
})

const mediatorConnection = await this.connectionService.getById(mediationRecord.connectionId)
return this.messageSender.sendMessage(createOutboundMessage(mediatorConnection, statusRequestMessage))
}

private async getPickupStrategyForMediator(mediator: MediationRecord) {
let mediatorPickupStrategy = mediator.pickupStrategy ?? this.agentConfig.mediatorPickupStrategy

Expand Down
178 changes: 160 additions & 18 deletions packages/core/src/modules/routing/__tests__/mediationRecipient.test.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import type { Wallet } from '../../../wallet/Wallet'

import { getAgentConfig } from '../../../../tests/helpers'
import { getAgentConfig, getMockConnection, mockFunction } from '../../../../tests/helpers'
import { EventEmitter } from '../../../agent/EventEmitter'
import { MessageReceiver } from '../../../agent/MessageReceiver'
import { AgentEventTypes } from '../../../agent/Events'
import { MessageSender } from '../../../agent/MessageSender'
import { InboundMessageContext } from '../../../agent/models/InboundMessageContext'
import { Attachment } from '../../../decorators/attachment/Attachment'
import { AriesFrameworkError } from '../../../error'
import { IndyWallet } from '../../../wallet/IndyWallet'
import { ConnectionRepository } from '../../connections'
import { ConnectionRepository, ConnectionState } from '../../connections'
import { ConnectionService } from '../../connections/services/ConnectionService'
import { DeliveryRequestMessage, MessageDeliveryMessage, MessagesReceivedMessage, StatusMessage } from '../messages'
import { MediationRepository } from '../repository'
import { MediationRole, MediationState } from '../models'
import { MediationRecord, MediationRepository } from '../repository'
import { MediationRecipientService } from '../services'

jest.mock('../repository/MediationRepository')
Expand All @@ -19,14 +21,18 @@ const MediationRepositoryMock = MediationRepository as jest.Mock<MediationReposi
jest.mock('../../connections/repository/ConnectionRepository')
const ConnectionRepositoryMock = ConnectionRepository as jest.Mock<ConnectionRepository>

jest.mock('../../../agent/EventEmitter')
const EventEmitterMock = EventEmitter as jest.Mock<EventEmitter>

jest.mock('../../../agent/MessageSender')
const MessageSenderMock = MessageSender as jest.Mock<MessageSender>

jest.mock('../../../agent/MessageReceiver')
const MessageReceiverMock = MessageReceiver as jest.Mock<MessageReceiver>

const connectionImageUrl = 'https://example.com/image.png'

const mockConnection = getMockConnection({
state: ConnectionState.Complete,
})

describe('MediationRecipientService', () => {
const config = getAgentConfig('MediationRecipientServiceTest', {
endpoints: ['http://agent.com:8080'],
Expand All @@ -40,7 +46,7 @@ describe('MediationRecipientService', () => {
let connectionRepository: ConnectionRepository
let messageSender: MessageSender
let mediationRecipientService: MediationRecipientService
let messageReceiver: MessageReceiver
let mediationRecord: MediationRecord

beforeAll(async () => {
wallet = new IndyWallet(config)
Expand All @@ -53,64 +59,200 @@ describe('MediationRecipientService', () => {
})

beforeEach(async () => {
eventEmitter = new EventEmitter(config)
eventEmitter = new EventEmitterMock()
connectionRepository = new ConnectionRepositoryMock()
connectionService = new ConnectionService(wallet, config, connectionRepository, eventEmitter)
mediationRepository = new MediationRepositoryMock()
messageSender = new MessageSenderMock()
messageReceiver = new MessageReceiverMock()

// Mock default return value
mediationRecord = new MediationRecord({
connectionId: 'connectionId',
role: MediationRole.Recipient,
state: MediationState.Granted,
threadId: 'threadId',
})
mockFunction(mediationRepository.getByConnectionId).mockResolvedValue(mediationRecord)

mediationRecipientService = new MediationRecipientService(
wallet,
connectionService,
messageSender,
config,
mediationRepository,
eventEmitter,
messageReceiver
eventEmitter
)
})

describe('createStatusRequest', () => {
it('creates a status request message', async () => {
const statusRequestMessage = await mediationRecipientService.createStatusRequest(mediationRecord, {
recipientKey: 'a-key',
})

expect(statusRequestMessage).toMatchObject({
id: expect.any(String),
recipientKey: 'a-key',
})
})

it('it throws an error when the mediation record has incorrect role or state', async () => {
mediationRecord.role = MediationRole.Mediator
await expect(mediationRecipientService.createStatusRequest(mediationRecord)).rejects.toThrowError(
'Mediation record has invalid role MEDIATOR. Expected role RECIPIENT.'
)

mediationRecord.role = MediationRole.Recipient
mediationRecord.state = MediationState.Requested

await expect(mediationRecipientService.createStatusRequest(mediationRecord)).rejects.toThrowError(
'Mediation record is not ready to be used. Expected granted, found invalid state requested'
)
})
})

describe('processStatus', () => {
it('if status request has a message count of zero returns nothing', async () => {
const status = new StatusMessage({
messageCount: 0,
})
const deliveryRequestMessage = await mediationRecipientService.processStatus(status)

const messageContext = new InboundMessageContext(status, { connection: mockConnection })
const deliveryRequestMessage = await mediationRecipientService.processStatus(messageContext)
expect(deliveryRequestMessage).toBeNull()
})

it('if it has a message count greater than zero return a valid delivery request', async () => {
const status = new StatusMessage({
messageCount: 1,
})
const deliveryRequestMessage = await mediationRecipientService.processStatus(status)
const messageContext = new InboundMessageContext(status, { connection: mockConnection })

const deliveryRequestMessage = await mediationRecipientService.processStatus(messageContext)
expect(deliveryRequestMessage)
expect(deliveryRequestMessage).toEqual(new DeliveryRequestMessage({ id: deliveryRequestMessage?.id, limit: 1 }))
})

it('it throws an error when the mediation record has incorrect role or state', async () => {
const status = new StatusMessage({
messageCount: 1,
})
const messageContext = new InboundMessageContext(status, { connection: mockConnection })

mediationRecord.role = MediationRole.Mediator
await expect(mediationRecipientService.processStatus(messageContext)).rejects.toThrowError(
'Mediation record has invalid role MEDIATOR. Expected role RECIPIENT.'
)

mediationRecord.role = MediationRole.Recipient
mediationRecord.state = MediationState.Requested

await expect(mediationRecipientService.processStatus(messageContext)).rejects.toThrowError(
'Mediation record is not ready to be used. Expected granted, found invalid state requested'
)
})
})

describe('processDelivery', () => {
it('if the delivery has no attachments expect an error', async () => {
expect(mediationRecipientService.processDelivery({} as MessageDeliveryMessage)).rejects.toThrowError(
const messageContext = new InboundMessageContext({} as MessageDeliveryMessage, { connection: mockConnection })

await expect(mediationRecipientService.processDelivery(messageContext)).rejects.toThrowError(
new AriesFrameworkError('Error processing attachments')
)
})
it('other we should expect a message recieved with an message id list in it', async () => {

it('should return a message received with an message id list in it', async () => {
const messageDeliveryMessage = new MessageDeliveryMessage({
attachments: [
new Attachment({
id: '1',
data: {},
data: {
json: {
a: 'value',
},
},
}),
],
})
const messagesReceivedMessage = await mediationRecipientService.processDelivery(messageDeliveryMessage)
const messageContext = new InboundMessageContext(messageDeliveryMessage, { connection: mockConnection })

const messagesReceivedMessage = await mediationRecipientService.processDelivery(messageContext)

expect(messagesReceivedMessage).toEqual(
new MessagesReceivedMessage({
id: messagesReceivedMessage.id,
messageIdList: ['1'],
})
)
})

it('calls the event emitter for each message', async () => {
const messageDeliveryMessage = new MessageDeliveryMessage({
attachments: [
new Attachment({
id: '1',
data: {
json: {
first: 'value',
},
},
}),
new Attachment({
id: '2',
data: {
json: {
second: 'value',
},
},
}),
],
})
const messageContext = new InboundMessageContext(messageDeliveryMessage, { connection: mockConnection })

await mediationRecipientService.processDelivery(messageContext)

expect(eventEmitter.emit).toHaveBeenCalledTimes(2)
expect(eventEmitter.emit).toHaveBeenNthCalledWith(1, {
type: AgentEventTypes.AgentMessageReceived,
payload: {
message: { first: 'value' },
},
})
expect(eventEmitter.emit).toHaveBeenNthCalledWith(2, {
type: AgentEventTypes.AgentMessageReceived,
payload: {
message: { second: 'value' },
},
})
})

it('it throws an error when the mediation record has incorrect role or state', async () => {
const messageDeliveryMessage = new MessageDeliveryMessage({
attachments: [
new Attachment({
id: '1',
data: {
json: {
a: 'value',
},
},
}),
],
})
const messageContext = new InboundMessageContext(messageDeliveryMessage, { connection: mockConnection })

mediationRecord.role = MediationRole.Mediator
await expect(mediationRecipientService.processDelivery(messageContext)).rejects.toThrowError(
'Mediation record has invalid role MEDIATOR. Expected role RECIPIENT.'
)

mediationRecord.role = MediationRole.Recipient
mediationRecord.state = MediationState.Requested

await expect(mediationRecipientService.processDelivery(messageContext)).rejects.toThrowError(
'Mediation record is not ready to be used. Expected granted, found invalid state requested'
)
})
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export class MessageDeliveryHandler implements Handler {

public async handle(messageContext: InboundMessageContext<MessageDeliveryMessage>) {
const connection = messageContext.assertReadyConnection()
const deliveryReceivedMessage = await this.mediationRecipientService.processDelivery(messageContext.message)
const deliveryReceivedMessage = await this.mediationRecipientService.processDelivery(messageContext)

if (deliveryReceivedMessage) {
return createOutboundMessage(connection, deliveryReceivedMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export class StatusHandler implements Handler {

public async handle(messageContext: InboundMessageContext<StatusMessage>) {
const connection = messageContext.assertReadyConnection()
const deliveryRequestMessage = this.mediatorRecipientService.processStatus(messageContext.message)
const deliveryRequestMessage = await this.mediatorRecipientService.processStatus(messageContext)

if (deliveryRequestMessage) {
return createOutboundMessage(connection, deliveryRequestMessage)
Expand Down
Loading