diff --git a/packages/core/src/agent/Agent.ts b/packages/core/src/agent/Agent.ts index bb124992cd..49f6499e4e 100644 --- a/packages/core/src/agent/Agent.ts +++ b/packages/core/src/agent/Agent.ts @@ -110,26 +110,6 @@ export class Agent extends BaseAge } super(agentConfig, dependencyManager) - - const stop$ = this.dependencyManager.resolve>(InjectionSymbols.Stop$) - - // Listen for new messages (either from transports or somewhere else in the framework / extensions) - this.messageSubscription = this.eventEmitter - .observable(AgentEventTypes.AgentMessageReceived) - .pipe( - takeUntil(stop$), - concatMap((e) => - this.messageReceiver - .receiveMessage(e.payload.message, { - connection: e.payload.connection, - contextCorrelationId: e.payload.contextCorrelationId, - }) - .catch((error) => { - this.logger.error('Failed to process message', { error }) - }) - ) - ) - .subscribe() } public registerInboundTransport(inboundTransport: InboundTransport) { @@ -199,6 +179,26 @@ export class Agent extends BaseAge await this.mediator.initialize() await this.mediationRecipient.initialize() + const stop$ = this.dependencyManager.resolve>(InjectionSymbols.Stop$) + + // Listen for new messages (either from transports or somewhere else in the framework / extensions) + this.messageSubscription = this.eventEmitter + .observable(AgentEventTypes.AgentMessageReceived) + .pipe( + takeUntil(stop$), + concatMap((e) => + this.messageReceiver + .receiveMessage(e.payload.message, { + connection: e.payload.connection, + contextCorrelationId: e.payload.contextCorrelationId, + }) + .catch((error) => { + this.logger.error('Failed to process message', { error }) + }) + ) + ) + .subscribe() + this._isInitialized = true }