Skip to content

Commit

Permalink
fix: listen to incoming messages on agent initialize not constructor
Browse files Browse the repository at this point in the history
Signed-off-by: Niall Shaw <niall.shaw@absa.africa>
  • Loading branch information
niall-shaw committed Aug 8, 2023
1 parent e448a2a commit e530692
Showing 1 changed file with 20 additions and 20 deletions.
40 changes: 20 additions & 20 deletions packages/core/src/agent/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,26 +110,6 @@ export class Agent<AgentModules extends AgentModulesInput = any> extends BaseAge
}

super(agentConfig, dependencyManager)

const stop$ = this.dependencyManager.resolve<Subject<boolean>>(InjectionSymbols.Stop$)

// Listen for new messages (either from transports or somewhere else in the framework / extensions)
this.messageSubscription = this.eventEmitter
.observable<AgentMessageReceivedEvent>(AgentEventTypes.AgentMessageReceived)
.pipe(
takeUntil(stop$),
concatMap((e) =>
this.messageReceiver
.receiveMessage(e.payload.message, {
connection: e.payload.connection,
contextCorrelationId: e.payload.contextCorrelationId,
})
.catch((error) => {
this.logger.error('Failed to process message', { error })
})
)
)
.subscribe()
}

public registerInboundTransport(inboundTransport: InboundTransport) {
Expand Down Expand Up @@ -199,6 +179,26 @@ export class Agent<AgentModules extends AgentModulesInput = any> extends BaseAge
await this.mediator.initialize()
await this.mediationRecipient.initialize()

const stop$ = this.dependencyManager.resolve<Subject<boolean>>(InjectionSymbols.Stop$)

// Listen for new messages (either from transports or somewhere else in the framework / extensions)
this.messageSubscription = this.eventEmitter
.observable<AgentMessageReceivedEvent>(AgentEventTypes.AgentMessageReceived)
.pipe(
takeUntil(stop$),
concatMap((e) =>
this.messageReceiver
.receiveMessage(e.payload.message, {
connection: e.payload.connection,
contextCorrelationId: e.payload.contextCorrelationId,
})
.catch((error) => {
this.logger.error('Failed to process message', { error })
})
)
)
.subscribe()

this._isInitialized = true
}

Expand Down

0 comments on commit e530692

Please sign in to comment.