Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add message handler middleware and fallback #1894

Merged
merged 4 commits into from
Jun 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 47 additions & 7 deletions packages/core/src/agent/Dispatcher.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import type { AgentMessage } from './AgentMessage'
import type { AgentMessageProcessedEvent } from './Events'
import type { MessageHandlerMiddleware } from './MessageHandlerMiddleware'
import type { InboundMessageContext } from './models/InboundMessageContext'

import { InjectionSymbols } from '../constants'
import { CredoError } from '../error/CredoError'
import { CredoError } from '../error'
import { Logger } from '../logger'
import { ProblemReportError, ProblemReportReason } from '../modules/problem-reports'
import { injectable, inject } from '../plugins'
import { parseMessageType } from '../utils/messageType'
import { canHandleMessageType, parseMessageType } from '../utils/messageType'

import { ProblemReportMessage } from './../modules/problem-reports/messages/ProblemReportMessage'
import { EventEmitter } from './EventEmitter'
import { AgentEventTypes } from './Events'
import { MessageHandlerMiddlewareRunner } from './MessageHandlerMiddleware'
import { MessageHandlerRegistry } from './MessageHandlerRegistry'
import { MessageSender } from './MessageSender'
import { OutboundMessageContext } from './models'
Expand All @@ -34,22 +37,58 @@ class Dispatcher {
this.logger = logger
}

private defaultHandlerMiddleware: MessageHandlerMiddleware = async (inboundMessageContext, next) => {
let messageHandler = inboundMessageContext.messageHandler

if (!messageHandler && inboundMessageContext.agentContext.dependencyManager.fallbackMessageHandler) {
messageHandler = {
supportedMessages: [],
handle: inboundMessageContext.agentContext.dependencyManager.fallbackMessageHandler,
}
}

if (!messageHandler) {
throw new ProblemReportError(
`Error handling message ${inboundMessageContext.message.id} with type ${inboundMessageContext.message.type}. The message type is not supported`,
{
problemCode: ProblemReportReason.MessageParseFailure,
}
)
}

const outboundMessage = await messageHandler.handle(inboundMessageContext)
if (outboundMessage) {
inboundMessageContext.setResponseMessage(outboundMessage)
}

await next()
}

public async dispatch(messageContext: InboundMessageContext): Promise<void> {
const { agentContext, connection, senderKey, recipientKey, message } = messageContext
const messageHandler = this.messageHandlerRegistry.getHandlerForMessageType(message.type)

if (!messageHandler) {
throw new CredoError(`No handler for message type "${message.type}" found`)
// Set default handler if available, middleware can still override the message handler
const messageHandler = this.messageHandlerRegistry.getHandlerForMessageType(message.type)
if (messageHandler) {
messageContext.setMessageHandler(messageHandler)
}

let outboundMessage: OutboundMessageContext<AgentMessage> | void
let outboundMessage: OutboundMessageContext<AgentMessage> | undefined

try {
outboundMessage = await messageHandler.handle(messageContext)
const middlewares = [...agentContext.dependencyManager.messageHandlerMiddlewares, this.defaultHandlerMiddleware]
await MessageHandlerMiddlewareRunner.run(middlewares, messageContext)

outboundMessage = messageContext.responseMessage
} catch (error) {
const problemReportMessage = error.problemReport

if (problemReportMessage instanceof ProblemReportMessage && messageContext.connection) {
const messageType = parseMessageType(messageContext.message.type)
if (canHandleMessageType(ProblemReportMessage, messageType)) {
throw new CredoError(`Not sending problem report in response to problem report: ${message}`)
}

const { protocolUri: problemReportProtocolUri } = parseMessageType(problemReportMessage.type)
const { protocolUri: inboundProtocolUri } = parseMessageType(messageContext.message.type)

Expand Down Expand Up @@ -91,6 +130,7 @@ class Dispatcher {

await this.messageSender.sendMessage(outboundMessage)
}

// Emit event that allows to hook into received messages
this.eventEmitter.emit<AgentMessageProcessedEvent>(agentContext, {
type: AgentEventTypes.AgentMessageProcessed,
Expand Down
26 changes: 26 additions & 0 deletions packages/core/src/agent/MessageHandlerMiddleware.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import type { InboundMessageContext } from './models/InboundMessageContext'

export interface MessageHandlerMiddleware {
(inboundMessageContext: InboundMessageContext, next: () => Promise<void>): Promise<void>
}

export class MessageHandlerMiddlewareRunner {
public static async run(middlewares: MessageHandlerMiddleware[], inboundMessageContext: InboundMessageContext) {
const compose = (middlewares: MessageHandlerMiddleware[]) => {
return async function (inboundMessageContext: InboundMessageContext) {
let index = -1
async function dispatch(i: number): Promise<void> {
if (i <= index) throw new Error('next() called multiple times')
index = i
const fn = middlewares[i]
if (!fn) return
await fn(inboundMessageContext, () => dispatch(i + 1))
}
await dispatch(0)
}
}

const composed = compose(middlewares)
await composed(inboundMessageContext)
}
}
10 changes: 2 additions & 8 deletions packages/core/src/agent/MessageReceiver.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import type { AgentMessage } from './AgentMessage'
import type { DecryptedMessageContext } from './EnvelopeService'
import type { TransportSession } from './TransportService'
import type { AgentContext } from './context'
Expand All @@ -16,6 +15,7 @@ import { isValidJweStructure } from '../utils/JWE'
import { JsonTransformer } from '../utils/JsonTransformer'
import { canHandleMessageType, parseMessageType, replaceLegacyDidSovPrefixOnMessage } from '../utils/messageType'

import { AgentMessage } from './AgentMessage'
import { Dispatcher } from './Dispatcher'
import { EnvelopeService } from './EnvelopeService'
import { MessageHandlerRegistry } from './MessageHandlerRegistry'
Expand Down Expand Up @@ -250,13 +250,7 @@ export class MessageReceiver {
replaceLegacyDidSovPrefixOnMessage(message)

const messageType = message['@type']
const MessageClass = this.messageHandlerRegistry.getMessageClassForMessageType(messageType)

if (!MessageClass) {
throw new ProblemReportError(`No message class found for message type "${messageType}"`, {
problemCode: ProblemReportReason.MessageParseFailure,
})
}
const MessageClass = this.messageHandlerRegistry.getMessageClassForMessageType(messageType) ?? AgentMessage

// Cast the plain JSON object to specific instance of Message extended from AgentMessage
let messageTransformed: AgentMessage
Expand Down
Loading
Loading