The most basic and unopinionated implementation of NSQ transport for NestJS microservices.
finish()
is called automatically when no errors are thrown while handling, otherwise requeue()
is called.
No request-response messaging support and it won't be added, as it's better to use appropriate RPC transports
npm i nest-nsq-transport nsqjs
npm i -D @types/nsqjs
import { Message } from 'nsqjs';
import { NSQStrategy, NSQStrategyOptions, NSQPattern } from 'nest-nsq-transport';
NestFactory.createMicroservice(
AppModule,
{
strategy: new NSQStrategy(<NSQStrategyOptions>{
// Optional deserializer, please see implementation in the sources.
//
// Default deserializer converts message data from Buffer to utf8 string.
// Could be replaced with the built-in JSONDeserializer or a custom one.
deserializer: new MyDeserializer();
// optional default options that will be passed to all nsqjs.Reader's of
// each channel. It's possible to skip this field and describe options
// for each channel separately with `channels` field
defaultChannelOptions: {
nsqdTCPAddresses: [process.env.NSQD_TCP_ADDR],
// or
lookupdHTTPAddresses: [process.env.LOOKUPD_HTTP_ADDR],
// and all the other options, see here:
// https://github.com/dudleycarr/nsqjs#new-readertopic-channel-options
// with one extra option:
// as `requeue` is called automatically on error, there is no way to
// pass parameters, but it's possible to provide either default args:
requeueParams: [1, true],
// or a callback
requeueParams: (message: Message, pattern: NSQPattern) => [1, true],
},
// Optional map of topic -> channel -> options
channels: {
'my-topic': {
'my-channel': {
// this will be merged with the defaultChannelOptions
},
},
},
});
},
)
import { ClientsModule } from '@nestjs/microservices';
import { NSQClient, NSQClientOpitons } from 'nest-nsq-transport';
export const clientToken = Symbol();
@Module({
imports: [
ClientsModule.register([
{
name: clientToken,
customClass: NSQClient,
options: <NSQClientOpitons>{
// Optional serializer, please see implementation in the sources.
//
// Default serializer converts emitted data to a string and pass it as
// a Buffer to nsqjs.Writer.publish method. Could be replaced with the
// built-in JSONSerializer or a custom one.
serializer: new MySerializer(),
// Connections options:
// ether full nsqd tcp address
nsqdURL: process.env.NSQD_TCP_ADDR, // in `host:port` pattern
// or host and port separately
nsqdHost: process.env.NSQD_HOST,
nsqdPort: Number(process.env.NSQD_PORT),
// Other options from nsqjs.ConnectionConfigOptions, see here:
// https://github.com/dudleycarr/nsqjs#new-writernsqdhost-nsqdport-options
// ...
},
},
]),
],
})
class AppModule {}
import { ClientProxy, EventPattern, Payload, Ctx } from '@nestjs/microservices';
import { NSQContext } from 'nest-nsq-transport';
@Controller()
class TestController {
constructor(
// Token that passed to ClientsModule.register
@Inject(clientToken) private readonly client: ClientProxy
) {}
// To get only payload no decorators needed
@EventPattern('my-topic/my-channel')
async handle(payload: MyType) {
//
}
// Decorators are required to get context
@EventPattern('my-topic/my-channel-two')
async handle(
@Payload() payload: MyType,
@Ctx() ctx: NSQContext
) {
//
}
async emit() {
// This is an example how to emit data. Serializer which converts data to
// Buffer have to be provided when client is registered. There are two
// built-in serializers: JSONSerializer and StringSerializer
await this.client.emit('my-topic', data as MyType).toPromise();
}
// Also if `maxAttempts` is set in NSQStrategyOptions message could be
// discarded. It's possible to subscribe on discarded messages in a separate
// handler adding `/discard` suffix to pattern. If `maxAttempts` is set and
// such separate discard-handler is not provided then discarded message will
// be sent to the original handler again. But it's always possible to check
// current attempt with `ctx.message.attempts`, which starts from 1.
@EventPattern('my-topic/my-channel/discard')
async handle(
@Payload() payload: MyType,
@Ctx() ctx: NSQContext
) {
await this.client.emit(
`${ctx.pattern.topic}-${ctx.pattern.channel}-dead-letter`,
payload,
).toPromise()
}
}