Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: close session early if no return route #715

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/core/src/agent/MessageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ export class MessageReceiver {
session.connection = connection ?? undefined
messageContext.sessionId = session.id
this.transportService.saveSession(session)
} else if (session) {
// No need to wait for session to stay open if we're not actually going to respond to the message.
await session.close()
}

await this.dispatcher.dispatch(messageContext)
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/agent/TransportService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,5 @@ export interface TransportSession {
inboundMessage?: AgentMessage
connection?: ConnectionRecord
send(encryptedMessage: EncryptedMessage): Promise<void>
close(): Promise<void>
}
4 changes: 4 additions & 0 deletions packages/core/src/agent/__tests__/stubs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,8 @@ export class DummyTransportSession implements TransportSession {
public send(): Promise<void> {
throw new Error('Method not implemented.')
}

public close(): Promise<void> {
throw new Error('Method not implemented.')
}
}
6 changes: 6 additions & 0 deletions packages/node/src/transport/HttpInboundTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ export class HttpTransportSession implements TransportSession {
this.res = res
}

public async close(): Promise<void> {
if (!this.res.headersSent) {
this.res.status(200).end()
}
}

public async send(encryptedMessage: EncryptedMessage): Promise<void> {
if (this.res.headersSent) {
throw new AriesFrameworkError(`${this.type} transport session has been closed.`)
Expand Down
6 changes: 6 additions & 0 deletions packages/node/src/transport/WsInboundTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,10 @@ export class WebSocketTransportSession implements TransportSession {

this.socket.send(JSON.stringify(encryptedMessage))
}

public async close(): Promise<void> {
if (this.socket.readyState === WebSocket.OPEN) {
this.socket.close()
}
}
}
4 changes: 4 additions & 0 deletions tests/transport/SubjectInboundTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,8 @@ export class SubjectTransportSession implements TransportSession {
public async send(encryptedMessage: EncryptedMessage): Promise<void> {
this.replySubject.next({ message: encryptedMessage })
}

public async close(): Promise<void> {
this.replySubject.complete()
}
}
43 changes: 21 additions & 22 deletions tests/transport/SubjectOutboundTransport.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
import type { Agent, Logger } from '../../packages/core/src'
import type { OutboundTransport } from '../../packages/core/src/transport/OutboundTransport'
import type { OutboundPackage } from '../../packages/core/src/types'
import type { SubjectMessage } from './SubjectInboundTransport'
import type { Subscription } from 'rxjs'
import type { OutboundPackage, OutboundTransport, Agent, Logger } from '@aries-framework/core'

import { Subject } from 'rxjs'
import { takeUntil, Subject, take } from 'rxjs'

import { InjectionSymbols, AriesFrameworkError } from '../../packages/core/src'
import { InjectionSymbols, AriesFrameworkError } from '@aries-framework/core'

export class SubjectOutboundTransport implements OutboundTransport {
private logger!: Logger
private ourSubject = new Subject<SubjectMessage>()
private returnRouteMessageSubscription?: Subscription
private subjectMap: { [key: string]: Subject<SubjectMessage> | undefined }
private agent!: Agent

public supportedSchemes = ['rxjs']

Expand All @@ -21,23 +17,13 @@ export class SubjectOutboundTransport implements OutboundTransport {
}

public async start(agent: Agent): Promise<void> {
this.agent = agent

this.logger = agent.injectionContainer.resolve(InjectionSymbols.Logger)
this.subscribe(agent)
}

public async stop(): Promise<void> {
this.returnRouteMessageSubscription?.unsubscribe()
this.ourSubject.complete()
}

private subscribe(agent: Agent) {
this.returnRouteMessageSubscription = this.ourSubject.subscribe({
next: async ({ message }: SubjectMessage) => {
this.logger.test('Received message')

await agent.receiveMessage(message)
},
})
// No logic needed
}

public async sendMessage(outboundPackage: OutboundPackage) {
Expand All @@ -56,6 +42,19 @@ export class SubjectOutboundTransport implements OutboundTransport {
throw new AriesFrameworkError(`No subject found for endpoint ${endpoint}`)
}

subject.next({ message: payload, replySubject: this.ourSubject })
// Create a replySubject just for this session. Both ends will be able to close it,
// mimicking a transport like http or websocket. Close session automatically when agent stops
const replySubject = new Subject<SubjectMessage>()
this.agent.config.stop$.pipe(take(1)).subscribe(() => !replySubject.closed && replySubject.complete())

replySubject.pipe(takeUntil(this.agent.config.stop$)).subscribe({
next: async ({ message }: SubjectMessage) => {
this.logger.test('Received message')

await this.agent.receiveMessage(message)
},
})

subject.next({ message: payload, replySubject })
}
}