Fastify plugin to interact with Apache Kafka, supporting Kafka producers and consumers.
To achieve the best performance, the plugin uses node-rdkafka
.
npm i @fastify/kafka
Plugin version | Fastify version |
---|---|
>=3.x |
^5.x |
>=0.x <3.x |
^4.x |
>=0.x <3.x |
^3.x |
>=0.x <3.x |
^2.x |
>=0.x <3.x |
^1.x |
Please note that if a Fastify version is out of support, then so are the corresponding versions of this plugin in the table above. See Fastify's LTS policy for more details.
const crypto = require('node:crypto')
const fastify = require('fastify')()
const group = crypto.randomBytes(20).toString('hex')
fastify
.register(require('@fastify/kafka'), {
producer: {
'metadata.broker.list': '127.0.0.1:9092',
'fetch.wait.max.ms': 10,
'fetch.error.backoff.ms': 50,
'dr_cb': true
},
consumer: {
'metadata.broker.list': '127.0.0.1:9092',
'group.id': group,
'fetch.wait.max.ms': 10,
'fetch.error.backoff.ms': 50,
'auto.offset.reset': 'earliest'
}
})
fastify.post('/data', (req, reply) => {
fastify.kafka.push({
topic: 'updates',
payload: req.body,
key: 'dataKey'
})
})
fastify.kafka.subscribe('updates')
fastify.kafka.on('updates', (msg, commit) => {
console.log(msg.value.toString())
commit()
})
fastify.listen({ port: 3000 }, err => {
if (err) throw err
console.log(`server listening on ${fastify.server.address().port}`)
})
For more examples on how to use this plugin, you can take a look at the examples directory.
This module exposes the following APIs:
fastify.kafka.producer
: the producer instancefastify.kafka.push
: utility to produce a new message
fastify.kafka.consumer
: the consumer instancefastify.kafka.consume
: utility to start the message consumingfastify.kafka.subscribe
: utility to begin subscribing to one or more topicsfastify.kafka.on
: topic listener
This project is kindly sponsored by:
Licensed under MIT.