Kafka for NestJS
- Multiple connections
- Consumer and Producer with Schema Registry support (using kafkajs and @kafkajs/confluent-schema-registry under the hood)
- Integration with nest-template
- Consumer
- Subscribe topic is not static, you can pick it from config
- Process message in async context with Tracing and Logging
- String, JSON and Schema Registry decoders for key and value, headers decoder with array support
- Dead letter queue pattern support with smart retry mechanism
- Support custom decoders and error handling patterns
- Node.js v14 LTS or later
- Yarn
yarn add @byndyusoft/nest-kafka @byndyusoft/class-validator-extended @byndyusoft/nest-opentracing @byndyusoft/nest-pino @kafkajs/confluent-schema-registry @nestjs/common @nestjs/microservices class-transformer class-validator kafkajs rxjs
1. Create KafkaConfigDto
import {
KafkaClusterConfigDto,
KafkaConsumerConfigDto,
KafkaProducerConfigDto,
KafkaSchemaRegistryArgsConfigDto,
} from "@byndyusoft/nest-kafka";
import { Type } from "class-transformer";
import { IsDefined, IsString, ValidateNested } from "class-validator";
export class KafkaConfigDto {
@Type(() => KafkaClusterConfigDto)
@IsDefined()
@ValidateNested()
public readonly cluster!: KafkaClusterConfigDto;
@Type(() => KafkaConsumerConfigDto)
@IsDefined()
@ValidateNested()
public readonly consumer!: KafkaConsumerConfigDto;
@Type(() => KafkaProducerConfigDto)
@IsDefined()
@ValidateNested()
public readonly producer!: KafkaProducerConfigDto;
@Type(() => KafkaSchemaRegistryArgsConfigDto)
@IsDefined()
@ValidateNested()
public readonly schemaRegistry!: KafkaSchemaRegistryArgsConfigDto;
@IsString()
public readonly topic!: string;
@IsString()
public readonly errorTopic!: string;
}
2. Add KafkaConfigDto
into ConfigDto
import { Type } from "class-transformer";
import { IsDefined, ValidateNested } from "class-validator";
import { KafkaConfigDto } from "./kafkaConfigDto";
export class ConfigDto {
/// ...
@Type(() => KafkaConfigDto)
@IsDefined()
@ValidateNested()
public readonly kafka!: KafkaConfigDto;
/// ...
}
3. Add env variables mapping
import { Module } from "@nestjs/common";
import { ConfigDto } from "./dtos";
@Module({})
export class ConfigModule {
// ...
private static __loadConfig(): ConfigDto {
const plainConfig: ConfigDto = {
// ...
kafka: {
cluster: {
brokers: process.env.KAFKA_BROKERS as string,
saslMechanism: process.env.KAFKA_SASL_MECHANISM,
username: process.env.KAFKA_USERNAME,
password: process.env.KAFKA_PASSWORD,
ssl: process.env.KAFKA_SSL,
ca: process.env.KAFKA_CA,
connectionTimeout: process.env.KAFKA_CONNECTION_TIMEOUT, // default is 1 s.
},
consumer: {
groupId: process.env.KAFKA_CONSUMER_GROUP_ID as string,
allowAutoTopicCreation:
process.env.KAFKA_CONSUMER_ALLOW_AUTO_TOPIC_CREATION ?? true,
sessionTimeout: process.env.KAFKA_SESSION_TIMEOUT_MS ?? 30000,
heartbeatInterval: process.env.KAFKA_HEARTBEAT_INTERVAL_MS ?? 3000,
},
producer: {
allowAutoTopicCreation:
process.env.KAFKA_PRODUCER_ALLOW_AUTO_TOPIC_CREATION ?? true,
},
schemaRegistry: {
host: process.env.KAFKA_SCHEMA_REGISTRY_HOST as string,
username: process.env.KAFKA_SCHEMA_REGISTRY_USERNAME,
password: process.env.KAFKA_SCHEMA_REGISTRY_PASSWORD,
},
topic: process.env.KAFKA_TOPIC as string,
errorTopic: process.env.KAFKA_ERROR_TOPIC as string,
},
// ...
};
// ...
}
}
4. Import KafkaModule
import {
KafkaClusterConfigDto,
KafkaConsumerConfigDto,
KafkaModule,
KafkaProducerConfigDto,
KafkaSchemaRegistryArgsConfigDto,
} from "@byndyusoft/nest-kafka";
import { ConfigDto } from "./config";
@Module({
imports: [
// Extra modules
// ...
KafkaModule.registerAsync({
inject: [ConfigDto],
useFactory: (config: ConfigDto) => ({
connections: [
{
cluster: KafkaClusterConfigDto.toRawConfig(config.kafka.cluster),
consumer: KafkaConsumerConfigDto.toRawConfig(config.kafka.consumer),
producer: KafkaProducerConfigDto.toRawConfig(config.kafka.producer),
schemaRegistry: {
args: KafkaSchemaRegistryArgsConfigDto.toRawConfig(
config.kafka.schemaRegistry,
),
},
},
],
topicPickerArgs: [config],
}),
}),
// ...
],
})
export class InfrastructureModule {
// ...
}
4.1. You can describe multiple connections (farther use connectionName
parameter in some functions to specify your connection)
import {
KafkaClusterConfigDto,
KafkaConsumerConfigDto,
KafkaModule,
KafkaProducerConfigDto,
KafkaSchemaRegistryArgsConfigDto,
} from "@byndyusoft/nest-kafka";
import { ConfigDto } from "./config";
@Module({
imports: [
// Extra modules
// ...
KafkaModule.registerAsync({
inject: [ConfigDto],
useFactory: (config: ConfigDto) => ({
connections: [
{
name: "connection1",
cluster: KafkaClusterConfigDto.toRawConfig(config.kafka1.cluster),
consumer: KafkaConsumerConfigDto.toRawConfig(
config.kafka1.consumer,
),
producer: KafkaProducerConfigDto.toRawConfig(
config.kafka1.producer,
),
schemaRegistry: {
args: KafkaSchemaRegistryArgsConfigDto.toRawConfig(
config.kafka1.schemaRegistry,
),
},
},
{
name: "connection2",
cluster: KafkaClusterConfigDto.toRawConfig(config.kafka2.cluster),
consumer: KafkaConsumerConfigDto.toRawConfig(
config.kafka2.consumer,
),
producer: KafkaProducerConfigDto.toRawConfig(
config.kafka2.producer,
),
schemaRegistry: {
args: KafkaSchemaRegistryArgsConfigDto.toRawConfig(
config.kafka2.schemaRegistry,
),
},
},
],
topicPickerArgs: [config],
}),
}),
// ...
],
})
export class InfrastructureModule {
// ...
}
4.2. If you want, you can not create consumer
, producer
or schemaRegistry
import {
KafkaClusterConfigDto,
KafkaConsumerConfigDto,
KafkaModule,
KafkaProducerConfigDto,
KafkaSchemaRegistryArgsConfigDto,
} from "@byndyusoft/nest-kafka";
import { ConfigDto } from "./config";
@Module({
imports: [
// Extra modules
// ...
KafkaModule.registerAsync({
inject: [ConfigDto],
useFactory: (config: ConfigDto) => ({
connections: [
{
cluster: KafkaClusterConfigDto.toRawConfig(config.kafka.cluster),
consumer: KafkaConsumerConfigDto.toRawConfig(config.kafka.consumer),
},
],
topicPickerArgs: [config],
}),
}),
// ...
],
})
export class InfrastructureModule {
// ...
}
5. Connect microservice to start consuming messages
import { KafkaConsumer, KafkaRetryConsumer } from "@byndyusoft/nest-kafka";
import { MicroserviceOptions } from "@nestjs/microservices";
async function bootstrap(): Promise<void> {
// ...
app.connectMicroservice<MicroserviceOptions>({
strategy: app.get(KafkaConsumer),
});
// you can optionally connect retry consumer
app.connectMicroservice<MicroserviceOptions>({
strategy: app.get(KafkaRetryConsumer),
});
// Put `app.listen(...)` before `app.startAllMicroservice()`
await app.listen(...)
await app.startAllMicroservices();
// ...
}
// ...
Important
Put app.startAllMicroservices()
after your app.listen(...)
1. Create controller and use KafkaConsumerEventPattern
to describe consumer
import {
IKafkaConsumerPayload,
KafkaConsumerEventPattern,
} from "@byndyusoft/nest-kafka";
import { Controller } from "@nestjs/common";
import { Payload } from "@nestjs/microservices";
import { ConfigDto } from "~/src";
@Controller()
export class UsersConsumer {
@KafkaConsumerEventPattern({
topicPicker: (config: ConfigDto) => config.kafka.topic,
fromBeginning: true,
})
public async onMessage(
@Payload() payload: IKafkaConsumerPayload,
): Promise<void> {
// ...
}
}
2. Decode payload
import {
IKafkaConsumerPayload,
KafkaConsumerEventPattern,
KafkaConsumerPayloadDecoder,
} from "@byndyusoft/nest-kafka";
import { Controller, UseInterceptors } from "@nestjs/common";
import { Payload } from "@nestjs/microservices";
import { ConfigDto } from "~/src";
import { UserDto } from "ᐸDtosᐳ";
@Controller()
export class UsersConsumer {
@KafkaConsumerEventPattern({
topicPicker: (config: ConfigDto) => config.kafka.topic,
fromBeginning: true,
})
@UseInterceptors(
new KafkaConsumerPayloadDecoder({
key: "string",
value: "json",
headers: "string",
}),
)
public async onMessage(
@Payload() payload: IKafkaConsumerPayload<string, UserDto>,
): Promise<void> {
// ...
}
}
2.1. You can use param decorators to get key, value or headers
import {
IKafkaConsumerPayloadHeaders,
KafkaConsumerEventPattern,
KafkaConsumerPayloadDecoder,
KafkaHeaders,
KafkaKey,
KafkaValue,
} from "@byndyusoft/nest-kafka";
import { Controller, UseInterceptors } from "@nestjs/common";
import { ConfigDto } from "~/src";
import { UserDto } from "ᐸDtosᐳ";
@Controller()
export class UsersConsumer {
@KafkaConsumerEventPattern({
topicPicker: (config: ConfigDto) => config.kafka.topic,
fromBeginning: true,
})
@UseInterceptors(
new KafkaConsumerPayloadDecoder({
key: "string",
value: "json",
headers: "string",
}),
)
public async onMessage(
@KafkaKey() key: string,
@KafkaValue() value: UserDto,
@KafkaHeaders() headers: IKafkaConsumerPayloadHeaders,
): Promise<void> {
// ...
}
}
3. Always use some exception filter for correct error handling
import {
KafkaConsumerBaseExceptionFilter,
KafkaConsumerEventPattern,
} from "@byndyusoft/nest-kafka";
import { Controller, UseFilters } from "@nestjs/common";
import { ConfigDto } from "~/src";
@Controller()
export class UsersConsumer {
@KafkaConsumerEventPattern({
topicPicker: (config: ConfigDto) => config.kafka.topic,
fromBeginning: true,
})
@UseFilters(/* ... */)
public async onMessage(): Promise<void> {
throw new Error("some error");
}
}
3.1. Use KafkaConsumerBaseExceptionFilter
if you prefer Stop on error pattern
import {
KafkaConsumerBaseExceptionFilter,
KafkaConsumerEventPattern,
} from "@byndyusoft/nest-kafka";
import { Controller, UseFilters } from "@nestjs/common";
import { ConfigDto } from "~/src";
@Controller()
export class UsersConsumer {
@KafkaConsumerEventPattern({
topicPicker: (config: ConfigDto) => config.kafka.topic,
fromBeginning: true,
})
@UseFilters(new KafkaConsumerBaseExceptionFilter())
public async onMessage(): Promise<void> {
throw new Error("some error");
}
}
3.2. Use KafkaConsumerErrorTopicExceptionFilter
if you prefer Dead letter queue pattern
import {
KafkaConsumerErrorTopicExceptionFilter,
KafkaConsumerEventPattern,
} from "@byndyusoft/nest-kafka";
import { Controller, UseFilters } from "@nestjs/common";
import { ConfigDto } from "~/src";
@Controller()
export class UsersConsumer {
@KafkaConsumerEventPattern({
topicPicker: (config: ConfigDto) => config.kafka.topic,
fromBeginning: true,
})
@UseFilters(
new KafkaConsumerErrorTopicExceptionFilter({
errorTopicPicker: (config: ConfigDto) => config.kafka.errorTopic,
}),
)
public async onMessage(): Promise<void> {
throw new Error("some error");
}
}
3.3. KafkaConsumerErrorTopicExceptionFilter
also support retry topic for retriable errors
import {
KafkaConsumerErrorTopicExceptionFilter,
KafkaConsumerEventPattern,
} from "@byndyusoft/nest-kafka";
import { Controller, UseFilters } from "@nestjs/common";
import { ConfigDto } from "~/src";
@Controller()
export class UsersConsumer {
@KafkaConsumerEventPattern({
topicPicker: (config: ConfigDto) => config.kafka.topic,
fromBeginning: true,
})
@UseFilters(
new KafkaConsumerErrorTopicExceptionFilter({
retryTopicPicker: (config: ConfigDto) => config.kafka.retryTopic,
errorTopicPicker: (config: ConfigDto) => config.kafka.errorTopic,
}),
)
public async onMessage(): Promise<void> {
throw new Error("some error");
}
}
3.4. Use retry consumer to consume messages from retry topic
import {
KafkaConsumerErrorTopicExceptionFilter,
KafkaConsumerEventPattern,
} from "@byndyusoft/nest-kafka";
import { Controller, UseFilters } from "@nestjs/common";
import { ConfigDto } from "~/src";
@Controller()
export class UsersRetryConsumer {
@KafkaRetryConsumerEventPattern({
topicPicker: (config: ConfigDto) => config.kafka.retryTopic,
fromBeginning: true,
})
@UseFilters(
new KafkaConsumerErrorTopicExceptionFilter({
retryTopicPicker: false,
errorTopicPicker: (config: ConfigDto) => config.kafka.errorTopic,
resendHeadersPrefix: "retry",
}),
)
public async onMessage(): Promise<void> {
throw new Error("some error");
}
}
Run retry consumer, e.g by HTTP:
import { ApiTags } from "@byndyusoft/nest-swagger";
import { Body, Controller, HttpCode, HttpStatus, Post } from "@nestjs/common";
import { ApiCommonResponses } from "../infrastructure";
import { RunDeliveryAppointmentsRetryConsumerOnceBodyDto } from "./dtos";
import { RunDeliveryAppointmentsRetryConsumerOnceUseCase } from "./useCases";
@ApiTags("Users")
@Controller({
path: "/users/retry",
version: "1",
})
export class UsersRetryController {
public constructor(
private readonly config: ConfigDto,
private readonly kafkaRetryConsumer: KafkaRetryConsumer,
) {}
@ApiCommonResponses(HttpStatus.BAD_REQUEST)
@HttpCode(HttpStatus.NO_CONTENT)
@Post("/runRetryConsumerOnce")
public runDeliveryAppointmentsRetryConsumerOnce(): Promise<void> {
await this.kafkaRetryConsumer.runOnce({
topic: config.kafka.retryTopic,
messagesCount: 1,
});
}
}
1. Inject KafkaProducer
import { InjectKafkaProducer, KafkaProducer } from "@byndyusoft/nest-kafka";
import { Injectable } from "@nestjs/common";
@Injectable()
export class UsersService {
public constructor(
@InjectKafkaProducer()
private readonly __kafkaProducer: KafkaProducer,
) {}
}
1. Inject KafkaSchemaRegistry
import {
InjectKafkaSchemaRegistry,
KafkaSchemaRegistry,
} from "@byndyusoft/nest-kafka";
import { Injectable } from "@nestjs/common";
@Injectable()
export class UsersService {
public constructor(
@InjectKafkaSchemaRegistry()
private readonly __kafkaSchemaRegistry: KafkaSchemaRegistry,
) {}
}
This repository is released under version 2.0 of the Apache License.